[SPARK-23587][SQL] Add interpreted execution for MapObjects expression#20771
[SPARK-23587][SQL] Add interpreted execution for MapObjects expression#20771viirya wants to merge 8 commits intoapache:masterfrom
Conversation
|
Test build #88079 has finished for PR 20771 at commit
|
| // The data with PythonUserDefinedType are actually stored with the data type of its sqlType. | ||
| // When we want to apply MapObjects on it, we have to use it. | ||
| lazy private val inputDataType = inputData.dataType match { | ||
| case p: PythonUserDefinedType => p.sqlType |
There was a problem hiding this comment.
Please use the UserDefinedType super class here.
There was a problem hiding this comment.
(I just noticed that this wasn't introduced by you, but please change it anyway)
| override def eval(input: InternalRow): Any = { | ||
| assert(input.numFields == 1, | ||
| "The input row of interpreted LambdaVariable should have only 1 field.") | ||
| input.get(0, dataType) |
There was a problem hiding this comment.
Not a change for this PR. Maybe we should use accessors here? This uses a matching under the hood and is slower than virtual function dispatch. Implementing this would also be useful for BoundReference for example.
There was a problem hiding this comment.
You mean something like this?
lazy val accessor: InternalRow => Any = dataType match {
case IntegerType => (inputRow) => inputRow.getInt(0)
case LongType => (inputRow) => inputRow.getLong(0)
...
}
override def eval(input: InternalRow): Any = accessor(input)There was a problem hiding this comment.
Let's spin that off into a different ticket if we want to work on it.
There was a problem hiding this comment.
Ok. After this is merged, I will create another PR for it.
|
Test build #88087 has finished for PR 20771 at commit
|
| return inputCollection | ||
| } | ||
|
|
||
| val results = inputDataType match { |
There was a problem hiding this comment.
We shouldn't be doing this during eval. Please move this into a function val.
| executeFuncOnCollection(inputCollection.asInstanceOf[ArrayData].array) | ||
| } | ||
|
|
||
| customCollectionCls match { |
There was a problem hiding this comment.
We shouldn't be doing this during eval. Please move this into a function val.
| val inputCollection = inputData.eval(input) | ||
|
|
||
| if (inputCollection == null) { | ||
| return inputCollection |
There was a problem hiding this comment.
NIT: It is slightly cleared to return null here.
|
Test build #88114 has finished for PR 20771 at commit
|
|
retest this please. |
| private lazy val getResults: Seq[_] => Any = customCollectionCls match { | ||
| case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) => | ||
| // Scala sequence | ||
| _.toSeq |
| _.toSet | ||
| case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => | ||
| // Java list | ||
| if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]] || |
There was a problem hiding this comment.
IIUC you are matching against non concrete implementations of java.util.List? Maybe add this as documentation.
| _.asJava | ||
| } else { | ||
| (results) => { | ||
| val builder = Try(cls.getConstructor(Integer.TYPE)).map { constructor => |
There was a problem hiding this comment.
Can you try to do the constructor lookup only once? The duplication that that will cause is ok.
There was a problem hiding this comment.
Not sure if I understand correctly. Please check update again.
| x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) | ||
| case ObjectType(cls) if cls == classOf[Object] => | ||
| (inputCollection) => { | ||
| if (inputCollection.getClass.isArray) { |
There was a problem hiding this comment.
(I am sorry for sounding like a broken record) But can we move this check out of the the function closure?
|
Test build #88119 has finished for PR 20771 at commit
|
|
Test build #88146 has finished for PR 20771 at commit
|
|
ping @hvanhovell |
| x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) | ||
| case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => | ||
| x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) | ||
| case ObjectType(cls) if cls == classOf[Object] => |
There was a problem hiding this comment.
Ugghh... I know understand why this needed. RowEncoder does not pass the needed type information down: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L146
This obviously needs to be done during evaluation. You got it right in the previous commit. I am sorry for misunderstanding this, and making you move it. Next time please call me out on this!
| } else { | ||
| // Specifying concrete implementations of `java.util.List` | ||
| (results) => { | ||
| val constructors = cls.getConstructors() |
There was a problem hiding this comment.
Is there a way we can move the constructor resolution out of the closure? I am fine with some code duplication here :)...
| x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) | ||
| } | ||
| case ArrayType(et, _) => | ||
| x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array) |
There was a problem hiding this comment.
This will blow up with UnsafeArrayData :(... It would be nice if we can avoid copying the entire array. We could implement an ArrayData wrapper that implements Seq or Iterable (I slightly prefer the latter).
There was a problem hiding this comment.
Shall we implement this wrapper here, or a follow-up?
|
|
||
| private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = { | ||
| inputCollection.map { element => | ||
| val row = InternalRow.fromSeq(Seq(element)) |
There was a problem hiding this comment.
NIT reuse the row object.
| } | ||
|
|
||
| // Executes lambda function on input collection. | ||
| private lazy val executeFunc: Any => Seq[_] = inputDataType match { |
There was a problem hiding this comment.
I am wondering if we shouldn't just return an Iterator instead of a Seq? This seems a bit more flexible, allows us to avoid materializing an intermediate sequence. WDYT?
| } | ||
|
|
||
| // Converts the processed collection to custom collection class if any. | ||
| private lazy val getResults: Seq[_] => Any = customCollectionCls match { |
There was a problem hiding this comment.
Can you add a catch all clause that throws a nice exception to this match statement?
|
Test build #88696 has finished for PR 20771 at commit
|
|
Test build #88708 has finished for PR 20771 at commit
|
|
Test build #88709 has finished for PR 20771 at commit
|
|
ping @hvanhovell Do you have any more comments? Thanks. |
hvanhovell
left a comment
There was a problem hiding this comment.
LGTM. Merging tot master. Thanks!
|
Thanks @hvanhovell, I will open another ticket & PR for the accessors, based on #20771 (comment). |
|
@viirya can you also file a ticket for the |
|
@hvanhovell Sure, I will do it too. |
## What changes were proposed in this pull request? Add interpreted execution for `MapObjects` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#20771 from viirya/SPARK-23587.
|
@viirya to be clear: let's do this into two separate JIRA's/PRs. |
|
@hvanhovell Yes. I thought "do it together" will be confusing, so I changed it to "do it too" later. :) |
## What changes were proposed in this pull request? Add interpreted execution for `MapObjects` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#20771 from viirya/SPARK-23587.
What changes were proposed in this pull request?
Add interpreted execution for
MapObjectsexpression.How was this patch tested?
Added unit test.